ea082cd9b8c672040ea5a77e19ed2a6fa9d42e2f
[arvados-workbench2.git] / src / store / process-logs-panel / process-logs-panel-actions.ts
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 import { unionize, ofType, UnionOf } from "common/unionize";
6 import { ProcessLogs } from './process-logs-panel';
7 import { LogEventType } from 'models/log';
8 import { RootState } from 'store/store';
9 import { ServiceRepository } from 'services/services';
10 import { Dispatch } from 'redux';
11 import { LogFragment, LogService, logFileToLogType } from 'services/log-service/log-service';
12 import { Process, getProcess } from 'store/processes/process';
13 import { navigateTo } from 'store/navigation/navigation-action';
14 import { snackbarActions, SnackbarKind } from 'store/snackbar/snackbar-actions';
15 import { CollectionFile, CollectionFileType } from "models/collection-file";
16
17 const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`;
18 const LOG_TIMESTAMP_PATTERN = /^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{9}Z/;
19
20 export const processLogsPanelActions = unionize({
21     RESET_PROCESS_LOGS_PANEL: ofType<{}>(),
22     INIT_PROCESS_LOGS_PANEL: ofType<{ filters: string[], logs: ProcessLogs }>(),
23     SET_PROCESS_LOGS_PANEL_FILTER: ofType<string>(),
24     ADD_PROCESS_LOGS_PANEL_ITEM: ofType<ProcessLogs>(),
25 });
26
27 // Max size of logs to fetch in bytes
28 const maxLogFetchSize: number = 128 * 1000;
29
30 type FileWithProgress = {
31     file: CollectionFile;
32     lastByte: number;
33 }
34
35 export type ProcessLogsPanelAction = UnionOf<typeof processLogsPanelActions>;
36
37 export const setProcessLogsPanelFilter = (filter: string) =>
38     processLogsPanelActions.SET_PROCESS_LOGS_PANEL_FILTER(filter);
39
40 export const initProcessLogsPanel = (processUuid: string) =>
41     async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
42         dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL());
43         const process = getProcess(processUuid)(getState().resources);
44         if (process?.containerRequest?.uuid) {
45             // Get log file size info
46             const logFiles = await loadContainerLogFileList(process.containerRequest.uuid, logService);
47
48             // Populate lastbyte 0 for each file
49             const filesWithProgress = logFiles.map((file) => ({file, lastByte: 0}));
50
51             // Fetch array of LogFragments
52             const logLines = await loadContainerLogFileContents(filesWithProgress, logService, process);
53
54             // Populate initial state with filters
55             const initialState = createInitialLogPanelState(logFiles, logLines);
56             dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
57         }
58     };
59
60 export const pollProcessLogs = (processUuid: string) =>
61     async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
62         try {
63             // Get log panel state and process from store
64             const currentState = getState().processLogsPanel;
65             const process = getProcess(processUuid)(getState().resources);
66
67             // Check if container request is present and initial logs state loaded
68             if (process?.containerRequest?.uuid && Object.keys(currentState.logs).length > 0) {
69                 const logFiles = await loadContainerLogFileList(process.containerRequest.uuid, logService);
70
71                 // Determine byte to fetch from while filtering unchanged files
72                 const filesToUpdateWithProgress = logFiles.reduce((acc, updatedFile) => {
73                     // Fetch last byte or 0 for new log files
74                     const currentStateLogLastByte = currentState.logs[logFileToLogType(updatedFile)]?.lastByte || 0;
75
76                     const isNew = !Object.keys(currentState.logs).find((currentStateLogName) => (updatedFile.name.startsWith(currentStateLogName)));
77                     const isChanged = !isNew && currentStateLogLastByte < updatedFile.size;
78
79                     if (isNew || isChanged) {
80                         return acc.concat({file: updatedFile, lastByte: currentStateLogLastByte});
81                     } else {
82                         return acc;
83                     }
84                 }, [] as FileWithProgress[]);
85
86                 // Perform range request(s) for each file
87                 const logFragments = await loadContainerLogFileContents(filesToUpdateWithProgress, logService, process);
88
89                 if (logFragments.length) {
90                     // Convert LogFragments to ProcessLogs with All/Main sorting & line-merging
91                     const groupedLogs = groupLogs(logFiles, logFragments);
92                     await dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM(groupedLogs));
93                 }
94             }
95             return Promise.resolve();
96         } catch (e) {
97             return Promise.reject();
98         }
99     };
100
101 const loadContainerLogFileList = async (containerUuid: string, logService: LogService) => {
102     const logCollectionContents = await logService.listLogFiles(containerUuid);
103
104     // Filter only root directory files matching log event types which have bytes
105     return logCollectionContents.filter((file): file is CollectionFile => (
106         file.type === CollectionFileType.FILE &&
107         file.path === `/arvados/v1/container_requests/${containerUuid}/log` &&
108         PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(logFileToLogType(file)) > -1 &&
109         file.size > 0
110     ));
111 };
112
113 /**
114  * Loads the contents of each file from each file's lastByte simultaneously
115  *   while respecting the maxLogFetchSize by requesting the start and end
116  *   of the desired block and inserting a snipline.
117  * @param logFilesWithProgress CollectionFiles with the last byte previously loaded
118  * @param logService
119  * @param process
120  * @returns LogFragment[] containing a single LogFragment corresponding to each input file
121  */
122 const loadContainerLogFileContents = async (logFilesWithProgress: FileWithProgress[], logService: LogService, process: Process) => (
123     (await Promise.all(logFilesWithProgress.filter(({file}) => file.size > 0).map(({file, lastByte}) => {
124         const requestSize = file.size - lastByte;
125         if (requestSize > maxLogFetchSize) {
126             const chunkSize = Math.floor(maxLogFetchSize / 2);
127             const firstChunkEnd = lastByte+chunkSize-1;
128             return Promise.all([
129                 logService.getLogFileContents(process.containerRequest.uuid, file, lastByte, firstChunkEnd),
130                 logService.getLogFileContents(process.containerRequest.uuid, file, file.size-chunkSize, file.size-1)
131             ] as Promise<(LogFragment | undefined)>[]);
132         } else {
133             return Promise.all([logService.getLogFileContents(process.containerRequest.uuid, file, lastByte, file.size-1)]);
134         }
135     }))).filter((logResponseSet) => ( // Top level filter ensures array of LogFragments is not empty and contains 1 or more fragments containing log lines
136         logResponseSet.length && logResponseSet.some(logFragment => logFragment && logFragment.contents.length)
137     )).map((logResponseSet)=> {
138         // Remove fragments from subarrays that are undefined or have no lines
139         const responseSet = logResponseSet.filter((logFragment): logFragment is LogFragment => (
140             !!logFragment && logFragment.contents.length > 0
141         ));
142
143         // For any multi fragment response set, modify the last line of non-final chunks to include a line break and snip line
144         //   Don't add snip line as a separate line so that sorting won't reorder it
145         for (let i = 1; i < responseSet.length; i++) {
146             const fragment = responseSet[i-1];
147             const lastLineIndex = fragment.contents.length-1;
148             const lastLineContents = fragment.contents[lastLineIndex];
149             const newLastLine = `${lastLineContents}\n${SNIPLINE}`;
150
151             responseSet[i-1].contents[lastLineIndex] = newLastLine;
152         }
153
154         // Merge LogFragment Array (representing multiple log line arrays) into single LogLine[] / LogFragment
155         return responseSet.reduce((acc, curr: LogFragment) => ({
156             logType: curr.logType,
157             contents: [...(acc.contents || []), ...curr.contents]
158         }), {} as LogFragment);
159     })
160 );
161
162 const createInitialLogPanelState = (logFiles: CollectionFile[], logFragments: LogFragment[]): {filters: string[], logs: ProcessLogs} => {
163     const logs = groupLogs(logFiles, logFragments);
164     const filters = Object.keys(logs);
165     return { filters, logs };
166 }
167
168 /**
169  * Converts LogFragments into ProcessLogs, grouping and sorting All/Main logs
170  * @param logFiles
171  * @param logFragments
172  * @returns ProcessLogs for the store
173  */
174 const groupLogs = (logFiles: CollectionFile[], logFragments: LogFragment[]): ProcessLogs => {
175     const sortableLogFragments = mergeMultilineLoglines(logFragments);
176
177     const allLogs = mergeSortLogFragments(sortableLogFragments);
178     const mainLogs = mergeSortLogFragments(sortableLogFragments.filter((fragment) => (MAIN_EVENT_TYPES.includes(fragment.logType))));
179
180     const groupedLogs = logFragments.reduce((grouped, fragment) => ({
181         ...grouped,
182         [fragment.logType as string]: {lastByte: fetchLastByteNumber(logFiles, fragment.logType), contents: fragment.contents}
183     }), {});
184
185     return {
186         [MAIN_FILTER_TYPE]: {lastByte: undefined, contents: mainLogs},
187         [ALL_FILTER_TYPE]: {lastByte: undefined, contents: allLogs},
188         ...groupedLogs,
189     }
190 };
191
192 /**
193  * Checks for non-timestamped log lines and merges them with the previous line, assumes they are multi-line logs
194  *   If there is no previous line (first line has no timestamp), the line is deleted.
195  *   Only used for combined logs that need sorting by timestamp after merging
196  * @param logFragments
197  * @returns Modified LogFragment[]
198  */
199 const mergeMultilineLoglines = (logFragments: LogFragment[]) => (
200     logFragments.map((fragment) => {
201         // Avoid altering the original fragment copy
202         let fragmentCopy: LogFragment = {
203             logType: fragment.logType,
204             contents: [...fragment.contents],
205         }
206         // Merge any non-timestamped lines in sortable log types with previous line
207         if (fragmentCopy.contents.length && !NON_SORTED_LOG_TYPES.includes(fragmentCopy.logType)) {
208             for (let i = 0; i < fragmentCopy.contents.length; i++) {
209                 const lineContents = fragmentCopy.contents[i];
210                 if (!lineContents.match(LOG_TIMESTAMP_PATTERN)) {
211                     // Partial line without timestamp detected
212                     if (i > 0) {
213                         // If not first line, copy line to previous line
214                         const previousLineContents = fragmentCopy.contents[i-1];
215                         const newPreviousLineContents = `${previousLineContents}\n${lineContents}`;
216                         fragmentCopy.contents[i-1] = newPreviousLineContents;
217                     }
218                     // Delete the current line and prevent iterating
219                     fragmentCopy.contents.splice(i, 1);
220                     i--;
221                 }
222             }
223         }
224         return fragmentCopy;
225     })
226 );
227
228 /**
229  * Merges log lines of different types and sorts types that contain timestamps (are sortable)
230  * @param logFragments
231  * @returns string[] of merged and sorted log lines
232  */
233 const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => {
234     const sortableLines = fragmentsToLines(logFragments
235         .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType))));
236
237     const nonSortableLines = fragmentsToLines(logFragments
238         .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType)))
239         .sort((a, b) => (a.logType.localeCompare(b.logType))));
240
241     return [...sortableLines.sort(sortLogLines), ...nonSortableLines]
242 };
243
244 const sortLogLines = (a: string, b: string) => {
245     return a.localeCompare(b);
246 };
247
248 const fragmentsToLines = (fragments: LogFragment[]): string[] => (
249     fragments.reduce((acc, fragment: LogFragment) => (
250         acc.concat(...fragment.contents)
251     ), [] as string[])
252 );
253
254 const fetchLastByteNumber = (logFiles: CollectionFile[], key: string) => {
255     return logFiles.find((file) => (file.name.startsWith(key)))?.size
256 };
257
258 export const navigateToLogCollection = (uuid: string) =>
259     async (dispatch: Dispatch<any>, getState: () => RootState, services: ServiceRepository) => {
260         try {
261             await services.collectionService.get(uuid);
262             dispatch<any>(navigateTo(uuid));
263         } catch {
264             dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Could not request collection', hideDuration: 2000, kind: SnackbarKind.ERROR }));
265         }
266     };
267
268 const ALL_FILTER_TYPE = 'All logs';
269
270 const MAIN_FILTER_TYPE = 'Main logs';
271 const MAIN_EVENT_TYPES = [
272     LogEventType.CRUNCH_RUN,
273     LogEventType.STDERR,
274     LogEventType.STDOUT,
275 ];
276
277 const PROCESS_PANEL_LOG_EVENT_TYPES = [
278     LogEventType.ARV_MOUNT,
279     LogEventType.CRUNCH_RUN,
280     LogEventType.CRUNCHSTAT,
281     LogEventType.DISPATCH,
282     LogEventType.HOSTSTAT,
283     LogEventType.NODE_INFO,
284     LogEventType.STDERR,
285     LogEventType.STDOUT,
286     LogEventType.CONTAINER,
287     LogEventType.KEEPSTORE,
288 ];
289
290 const NON_SORTED_LOG_TYPES = [
291     LogEventType.NODE_INFO,
292     LogEventType.CONTAINER,
293 ];